package com.markhsiu.minimq.broker.store.disk;

import java.io.File;
import java.io.FilenameFilter;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.markhsiu.minimq.core.thread.Scheduler;
import com.markhsiu.minimq.message.Message;

/**
 * 磁盘存储管理
 * 
 * @author Mark Hsiu
 *
 */
public class TopicDiskPool {

	private static final Logger logger = LoggerFactory.getLogger(TopicDiskPool.class);
	private static final String ROOT_PATH = DiskUtils.ROOT_PATH;
	private static final Map<String, TopicDiskQueue> queueMap = new ConcurrentHashMap<>();

	private Scheduler scheduler = new Scheduler();
	private static final BlockingQueue<String> DELETING_QUEUE = new LinkedBlockingQueue<>();
	private static TopicDiskPool INSTANCE = new TopicDiskPool();

	private TopicDiskPool() {}


	public static Map<String, TopicDiskQueue> queues(){
		return queueMap;
	}
	public static void toClear(String filePath) {
		DELETING_QUEUE.add(filePath);
	}

	public static void  builder() {
		if(INSTANCE == null){
			INSTANCE = new TopicDiskPool();
		}
		INSTANCE.init();
	}

	public synchronized static void destory() {
		if (INSTANCE != null) {
			INSTANCE.disposal();
			INSTANCE = null;
		}
	}
	
	public static void putMsg(Message message) {

		String topic = message.getTopic();
		TopicDiskQueue queue = queueMap.get(topic);
		if (queue == null) {
			queue = new TopicDiskQueue(ROOT_PATH,topic);
			queueMap.put(topic, queue);
		}
		queue.write(message);
	}

	public static Message pollMsg(String topic)  {

		TopicDiskQueue queue = queueMap.get(topic);
		if (queue == null) {
			return null;
		}
		return queue.read();
	}

	public void init() {
		scanDir();
		scheduler.putTask(new Runnable() {

			@Override
			public void run() {
				for (TopicDiskQueue queue : queueMap.values()) {
					queue.sync();
				}
				deleteBlockFile();
			}
		});
	}

	private void deleteBlockFile() {
		String blockFilePath = DELETING_QUEUE.poll();
		if (StringUtils.isNotBlank(blockFilePath)) {
			File delFile = new File(blockFilePath);
			try {
				if (!delFile.delete()) {
					logger.warn("block file:{} delete failed", blockFilePath);
				}
			} catch (SecurityException e) {
				logger.error("security manager exists, delete denied");
			}
		}
	}

	private void disposal() {
		scheduler.shutdown();
		for (TopicDiskQueue queue : queueMap.values()) {
			queue.close();
		}
		queueMap.clear();
		while (!DELETING_QUEUE.isEmpty()) {
			deleteBlockFile();
		}
	}

	private void scanDir() {
		File pathDir = new File(ROOT_PATH);
		if (!pathDir.exists()) {
			pathDir.mkdirs();
		}
		if (!pathDir.isDirectory()) {
			throw new IllegalArgumentException("it is not a directory");
		}

		File[] indexFiles = pathDir.listFiles(new FilenameFilter() {
			@Override
			public boolean accept(File dir, String name) {
				return DiskUtils.isIndexFile(name);
			}
		});

		if (ArrayUtils.isNotEmpty(indexFiles)) {
			for (File indexFile : indexFiles) {
				String topic = DiskUtils.truncateTopicName(indexFile.getName());
				queueMap.put(topic, new TopicDiskQueue(ROOT_PATH,topic));
			}
		}
	}

}
